// 使用mqtt作为消息队列的支持
package xqueuemqtt

import (
	"errors"
	"strings"
	"time"

	"gitee.com/xiaoyutab/xgotool/individual/xqueue"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

type Mqtts struct {
	Type     string      // mqtt类型 mqtt/mqtts/ws/wss/tcp/tls
	Addr     string      // mqtt地址
	User     string      // mqtt用户名
	Pass     string      // mqtt密码
	Topic    string      // mqtt主题
	ClientId string      // mqtt客户端ID
	Qos      byte        // mqtt服务质量
	Retain   bool        // mqtt消息是否需要持久化
	client   mqtt.Client // MQTT的客户端连接
	stop     chan bool   // 停止信号
}

// 打开MQTT协议的消息队列形式
func Open(cf *Mqtts) xqueue.Queue {
	if cf.Addr == "" {
		return nil
	}
	if cf.ClientId == "" {
		// 默认客户端ID格式：2006-01-02T15:04:05.999999999Z07:00
		cf.ClientId = "xqueue_mqtt_" + time.Now().Format(time.RFC3339Nano)
	}
	// 配置项调整
	if cf.Type == "" {
		cf.Type = strings.Split(cf.Addr, ":")[0]
		if cf.Type == "" {
			cf.Type = "tcp"
		}
	}
	cf.Type = strings.ToLower(cf.Type)
	// 判断是否在允许的选项中
	switch cf.Type {
	case "mqtt", "mqtts", "ws", "wss", "tcp", "tls":
	default:
		return nil
	}
	// 1. 连接mqtt服务器
	opts := mqtt.NewClientOptions().AddBroker(cf.Addr)
	// 2. 设置客户端ID
	opts.SetClientID(cf.ClientId)
	// 3. 设置用户名和密码
	if cf.User != "" && cf.Pass != "" {
		opts.SetUsername(cf.User)
		opts.SetPassword(cf.Pass)
	}
	// 4. 获取连接
	client := mqtt.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		return nil
	}
	cf.client = client
	cf.stop = make(chan bool)
	return cf
}

// 获取原始MQTT协议连接
func (c *Mqtts) Client() mqtt.Client {
	return c.client
}

// 关闭MQTT协议
func (c *Mqtts) Close() error {
	if c.client == nil {
		return nil
	}
	c.client.Disconnect(250)
	c.client = nil
	return nil
}

// 发送MQTT消息
func (c *Mqtts) Set(dat []byte) error {
	if c.client == nil {
		return errors.New("mqtt 未连接")
	}
	token := c.client.Publish(c.Topic, c.Qos, c.Retain, dat)
	token.WaitTimeout(time.Second * 5)
	if token.Error() != nil {
		return token.Error()
	}
	return nil
}

// 发送延时消息
// 因MQTT协议的特性，无法实现延时消息，因此这里使用go语言的定时器来实现延时消息
func (c *Mqtts) SetDef(dat []byte, t time.Duration) error {
	if c.client == nil {
		return errors.New("mqtt 未连接")
	}
	go func() {
		time.Sleep(t)
		c.Set(dat)
	}()
	return nil
}

// 消息监听
func (c *Mqtts) Listen(f func(dat []byte) error) error {
	if c.client == nil {
		return errors.New("mqtt 未连接")
	}
	// 订阅消息
	token := c.client.Subscribe(c.Topic, c.Qos, func(client mqtt.Client, m mqtt.Message) {
		if m.Topic() == c.Topic {
			f(m.Payload())
		}
	})
	token.Wait()
	if token.Error() != nil {
		return token.Error()
	}
	// 在此处进行阻塞，避免程序退出
	for {
		select {
		case <-c.stop:
			return nil
		default:
			time.Sleep(time.Second)
		}
	}
}

// 停止监听
func (c *Mqtts) Stop() {
	if c.stop != nil {
		c.stop <- true
	}
}
